/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.alibaba.jstorm.window;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Tuple;

import com.alibaba.jstorm.transactional.bolt.ITransactionStatefulBoltExecutor;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * An {@link backtype.storm.topology.IWindowedBolt} wrapper that does the windowing of tuples.
 */
public class TransactionalWindowedBoltExecutor extends WindowedBoltExecutor implements ITransactionStatefulBoltExecutor {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(TransactionalWindowedBoltExecutor.class);

    public TransactionalWindowedBoltExecutor(BaseWindowedBolt<Tuple> bolt) {
        super(bolt);
    }

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        super.prepare(stormConf, context, collector);
        if (bolt instanceof TransactionalWindowedBolt)
            ((TransactionalWindowedBolt) bolt).createState(context);
    }

    @SuppressWarnings("unchecked")
    @Override
    public void initState(Object userState) {
        // init windows, triggers, user states
        List<Object> stateList = (List<Object>) userState;
        if (stateList != null) {
            // current watermark
            this.currentWatermark = (Long) stateList.get(0);
            this.userWindowStates = (ConcurrentHashMap<TimeWindow, Object>) stateList.get(1);
            this.accuUserWindowStates = (ConcurrentHashMap<TimeWindow, Object>) stateList.get(2);
            this.userWindowToStateWindow = (ConcurrentHashMap<TimeWindow, TimeWindow>) stateList.get(3);

            for (TimeWindow window : this.userWindowStates.keySet()) {
                createTriggerForWindow(windowAssigner, window, windowToTriggers);
            }
            for (TimeWindow window : this.accuUserWindowStates.keySet()) {
                createTriggerForWindow(stateWindowAssigner, window, windowToTriggers);
            }

            // delete timers that don't belong to current active windows
            if (timeCharacteristic == TimeCharacteristic.EVENT_TIME) {
                for (TimeWindow window : this.eventTimeTimerFutures.keySet()) {
                    if (!this.userWindowStates.containsKey(window)) {
                        windowContext.deleteEventTimeTimer(window);
                        windowToTriggers.remove(window);
                    }
                    windowContext.registerEventTimeTimer(window.getEnd(), window);
                }
            } else if (timeCharacteristic == TimeCharacteristic.PROCESSING_TIME ||
                    timeCharacteristic == TimeCharacteristic.INGESTION_TIME) {
                for (TimeWindow window : this.processingTimeTimerFutures.keySet()) {
                    if (!this.userWindowStates.containsKey(window)) {
                        windowContext.deleteProcessingTimeTimer(window);
                        windowToTriggers.remove(window);
                    }
                    windowContext.registerProcessingTimeTimer(window.getEnd(), window);
                }
            }
        }

        if (stateOperator != null) {
            Object state = stateList != null ? getUserState(stateList) : null;
            stateOperator.initState(state);
        }
    }

    @Override
    public Object finishBatch(long batchId) {
        List<Object> stateList = new ArrayList<>();
        stateList.add(this.currentWatermark);
        stateList.add(this.userWindowStates);
        stateList.add(this.accuUserWindowStates);
        stateList.add(this.userWindowToStateWindow);

        // save windows, triggers, user states
        if (stateOperator != null) {
            Object userState = stateOperator.finishBatch(batchId);
            stateList.add(userState);
        }

        return stateList;
    }

    // if user defines his own state operator, let user handle/save the state
    // otherwise we pass the whole state (generated by calling finishBatch)
    // to topology master for persistence
    @SuppressWarnings("unchecked")
    @Override
    public Object commit(long batchId, Object state) {
        List<Object> stateList = (List<Object>) state;
        if (stateOperator != null) {
            Object commitState = stateOperator.commit(batchId, stateList);
            stateList.add(commitState);
        }
        return stateList;
    }

    @Override
    public void rollBack(Object userState) {
        if (stateOperator != null) {
            Object state = userState != null ? getUserState((List<Object>) userState) : null;
            stateOperator.rollBack(state);
        }

        clearStates();
        initState(userState);
    }

    @Override
    public void ackCommit(long batchId, long timeStamp) {
        if (stateOperator != null) {
            stateOperator.ackCommit(batchId, timeStamp);
        }
        LOG.debug("batch {} has been processed at {}", batchId, timeStamp);
    }

    private Object getUserState(List<Object> stateList) {
        // last item is used for user state by default
        int size = stateList.size();
        return stateList.get(size - 1);
    }

    private void clearStates() {
        this.userWindowStates.clear();
        this.accuUserWindowStates.clear();
        this.userWindowToStateWindow.clear();
        this.windowToTriggers.clear();
    }
}
